DynamoDB Streamsの改めて再検証してみた

DynamoDB Streamsの改めて再検証してみた

こんにちは!コンサルティング部のキムです! 昨日は「【サーバーレス】18,570本のブログの形態素分析をやってみた」という記事を書きましたが、その中で 「100個のURLは確実にDynamoDBの中に格納されたんですが、何故か40個ほどのストリームイベントだけがトリガーされて、残りのストリームイベントはトリガーされてなかったです。他の作業をやりながら1時間以上を待っていても結果は同じでした。」 ということを書き、弊社の優秀なエンジニアたちにたくさんのコメントを貰いました。(怒られました) なので今朝、朝一何が問題だったか、DynamoDB Streamsは想定通り動くのかを確認してみましたので、その結果を共有したいと思います。 (結果として、恥ずかしいながら、100% 私のミスでした。)
Clock Icon2019.10.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!コンサルティング部のキムです!

昨日は「【サーバーレス】18,570本のブログの形態素分析をやってみた」という記事を書きましたが、その中で

「100個のURLは確実にDynamoDBの中に格納されたんですが、何故か40個ほどのストリームイベントだけがトリガーされて、残りのストリームイベントはトリガーされてなかったです。他の作業をやりながら1時間以上を待っていても結果は同じでした。」

ということを書き、弊社の優秀なエンジニアたちにたくさんのコメントを貰いました。(怒られました)

なので今朝、朝一何が問題だったか、DynamoDB Streamsは想定通り動くのかを確認してみましたので、その結果を共有したいと思います。 (結果として、恥ずかしいながら、100% 私のミスでした。)

目次

DynamoDB Streamsは本当に問題があったのか

結論から言うと、私の使い方がそもそもDynamoDB Streamsのユースケースとは異なったところがありましたが、私のユースケースにおけるDynamoDB Streamsの性能に関しては全く問題なかったということでした!!

何が問題だったかと言うと、serverless framework でデプロイする際に streamイベントを設定したんですが、batchSize のデフォルト値が10に設定されてあり、DynamoDB Streamsのトリガーから実行された Lambda 関数のイベントパラメーターの中で最大10個のイベントが含まれてあったということでした。

すごく恥ずかしいながらも、何も考えらず以下のようにコードを書いていました。


def lambda_handler(event, context):
    ...
    
    url = event['Records'][0]['NewImage']['url']['S']
    print(url)
    
    ...

この url のログは Records[0] のみ参照しているので batchSize が 1 以上になっても、何も分からないコードでした。そのくせに DynamoDB Streams はちゃんとトリガーしてくれないって書いてしまいました。

DynamoDB Streams を再検証してみる

こういう訳で、改めて DynamoDB Streams を再検証してみました。今回の作業の検証の内容は「大きい規模のストリームイベントにもちゃんとトリガーしてLambdaを実行させてくれるのかどうかを確認すること」です。

検証環境は MacOS 上で python3.7, serverless framework v1.48.0, virtualenv 16.6.1 を使いました。

使ったコードの directory から始めます。

project_root/
    handler.py
    batchWriteDynamoData.py
    create_resource.sh
    serverless.yml
    venv

先ず、プロジェクトフォルダを作ってサーバーレスフレームワークの sls create を行います。

$ mkdir dynamodb-streams-scale-test
$ cd dynamodb-streams-scale-test
$ sls create --template=aws-python3

sls create を実行すると、serverless.yml ファイルと handler.py ファイルが作成されます。DynamoDB を作る shell script を作成しました。

$ touch create_resource.sh
$ vi create_resource.sh

create_resource.sh の内容は以下のように作成します。

aws dynamodb create-table \
--table-name DDBStreamsForScaleTest \
--key-schema AttributeName=url,KeyType=HASH AttributeName=timestamp,KeyType=RANGE \
--attribute-definitions AttributeName=url,AttributeType=S AttributeName=timestamp,AttributeType=S \
--billing-mode PAY_PER_REQUEST \
--stream-specification StreamEnabled=True,StreamViewType=NEW_IMAGE

昨日作ったコードを一部活用していたので、key schema 辺りは何の意味もありません。

とりあえず DynamoDB を作ってストリームのarnを取りました。

$ sh create_resource.sh
{
    "TableDescription": {
        ...
        "StreamSpecification": {
            "StreamEnabled": true,
            "StreamViewType": "NEW_IMAGE"
        },
        "LatestStreamLabel": "2019-10-03T02:04:28.978",
        "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/DDBStreamsForLargeScale/stream/2019-10-03T02:04:28.978"
    }
}

この LatestStreamArn の値をコピーしておいて、以下の serverless.yml のstream arn の欄に記載してください。

次は serverless.yml ファイルです。

service: dynamodb-streams-scale-test

provider:
  name: aws
  runtime: python3.7
  stage: dev
  region: ap-northeast-1

functions:
  hello:
    handler: handler.hello
    memorySize: 128
    timeout: 1
    events:
      - stream: 
          arn: arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/DDBStreamsForLargeScale/stream/2019-10-03T02:04:28.978
          batchSize: 1

Lambda の ハンドラーファイルは以下のように event を出力するだけの簡単なタスクをやります。なので、timeout や memorySize は上の値で十分です。

def hello(event, context):
    print("DynamoDB Streamsを検証している。")
    print(event)
    return "success"

これができましたら、sls deploy でデプロイします。

$ sls deploy -v

準備は終わりました。あとは DynamoDB にデータを入れてみます。以下のコマンドを実行して batchWriteDynamoData.py と venv を作ります。

$ virtualenv venv
$ source venv/bin/activate
$ pip install boto3
$ touch batchWriteDynamoData.py

batchWriteDynamoData.py に以下の内容を作成します。

import boto3
import datetime
import uuid

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('DDBStreamsForScaleTest')

url_list = [ str(uuid.uuid4()) for i in range(1000) ]

try:
    with table.batch_writer() as batch:
        for url in url_list:
            url_data = {
                'url': url,
                'timestamp': datetime.datetime.utcnow().isoformat()
            }
            batch.put_item(Item=url_data)

except ClientError as e:
    print(e.response['Error']['Message'])


print('done!')

データの内容は本当に無意味です。url_list を作るときの range(20000) のところだけを検証してみたい値で置換します。私は 1,000 と 10,0000 の二回の検証をするつもりです。

$ python batchWriteDynamoData.py

done! が出たら実際に何回実行されていたのかを確認します。確認方法はCloudWatch LogsのInsightsで DynamoDB Streamsを検証している。 のメッセージの出力数を数えます。

ロググループを /aws/lambda/dynamodb-streams-scale-test-dev-hello で選択して、以下のクエリを作成します。

fields @message 
| filter @message like "DynamoDB Streamsを検証している。"
| stats count(*)

CloudWatch Logs の Insights は検索可能になれるまでの時間が少し(体感1−2分ほど)かかるので、少し待ってから検索すれば以下のような実行回数が分かります。

cloudwatch-mng-console-1000

Lambdaのモニタリング画面を見ても失敗なく1000回の実行が行ったことが分かります。

lambda-monitoring-1000

次は10,000個のデータをDynamoDBに入れてみます。batchWriteDynamoData.py の range(1000) のところを range(10000) に変えました。 もう一回の実行を行います。

$ python batchWriteDynamoData.py

前回と同様に少し待ってから CloudWatch Logs の Insights でログ検索をしました。

cloudwatch-mng-console-10000

前回のログ数と合わせて11,000になっていることが確認できました。

Lambdaのモニタリング画面もみてみます。10,000回の実行の中で失敗は一回もなく10,000回全部正常実行されていたことが分かりました。

lambda-monitoring-10000

リソース削除

今回の作業はサーバーレスのみの機能を利用し、基本的にそのまま放置しておいても追加費用はかかりませんが、常に作業環境を綺麗にしておくことを身につけておくと後でミスを防げますし、費用がかかるサービスも忘れずに削除する等良いところが多いです。

この観点でみると常にオートメーションが可能なツールを使って構築したりすると構築もより効果的であり、簡単になるんですが、リソースの削除も簡単です。今回はサーバーレスフレームワークを使ってデプロイしましたが、削除も一つのラインで終わります。

$ sls remove -v

後、DynamoDBも削除します。

$ aws dynamodb delete-table --table-name=DDBStreamsForScaleTest

以上、この二つのコマンドで検証に使った全てのリソースが削除されました。

私は aws cli も積極的に使っています。6月に弊社の奥が aws cli の発表資料を公開しておきましたので、この記事をご参考にして頂ければと思います。私もこの資料を読んで aws cli を使い始めました。

まとめ

自分がよく分からないくせにこの機能はダメだなーと考えてしまったことが恥ずかしいながらも、今度からは必ずちゃんと検証してから作業したいと思いました。

後、DynamoDB Streams はこのような規模でも良く動きますが、これは「本当に簡単なLambda関数だったから」ではないかと思います。例えば、Lambda関数の実行時間が平均1分になった場合、より効果的な並列処理のため shard 数を調整したりする必要がありますので、やはりその辺り、つまり 大規模の並列処理 のためにはもっと細かい設定等が可能な Kinesis Streams が正しいではないかと感じています。

本記事の内容がどなたかに役に立てば幸いです。

以上、コンサルティング部のキムでした。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.